package com.zk.algorithm.sort;

/**
 * 多路磁盘归并排序，参考微信公众号 码洞
 * https://mp.weixin.qq.com/s?timestamp=1547737732&src=3&ver=1&signature=QvQWONNMCGe71gyH0q*xBvUhJGGT8IG5O0c90pTUHFfcpEWhoIvOFi6ERZc1dH7432YhVVs*aHdUmc20E7NR6-3V-7pAc9uy*Oyw33G0JXfIjGOsaS*AePrUwiyVAUQjEI14pegqRh1KPUbdfXLRulyL1*IYlQbe7bpYCsy-mfY=
 *
 * 整体思路就是多个文件多个指针，标记各个文件读取到哪一行了:
 *
 * 文件一:
 * 1
 * 2 <-- 读到这一行了
 * 3
 * 4
 * 5
 *
 * 文件二：
 * 2
 * 4 <-- 读到这一行了
 * 5
 * 7
 * 8
 *
 * 文件三：
 * 5 <-- 读到这一行了
 * 6
 * 9
 * 10
 * 15
 *
 * @author zk
 */

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

public class DiskMergeSort implements Closeable {

    /**
     * @param n 生成多少个文件
     * @param minEntries 每个文件 “最少” 多少个数
     * @param maxEntries 每个文件 “最多” 多少个数
     * @return
     */
    public static List<String> generateFiles(int n, int minEntries, int maxEntries) {
        List<String> files = new ArrayList<>();
        for (int i = 0; i < n; i++) {
            String filename = "input-" + i + ".txt";
            PrintWriter writer;
            try {
                writer = new PrintWriter(new FileOutputStream(filename));
                int entries = ThreadLocalRandom.current().nextInt(minEntries, maxEntries);
                List<Integer> nums = new ArrayList<>();
                for (int k = 0; k < entries; k++) {
                    int num = ThreadLocalRandom.current().nextInt(10000000);
                    nums.add(num);
                }
                Collections.sort(nums);
                for (int num : nums) {
                    writer.println(num);
                }
                writer.close();
            } catch (FileNotFoundException e) {
            }
            files.add(filename);
        }
        return files;
    }

    private List<MergeSource> sources;
    private MergeOut out;

    public DiskMergeSort(List<String> files, String outFilename) {
        this.sources = new ArrayList<>();
        for (String filename : files) {
            this.sources.add(new MergeSource(filename));
        }
        this.out = new MergeOut(outFilename);
    }

    static class MergeOut implements Closeable {
        private PrintWriter writer;

        public MergeOut(String filename) {
            try {
                this.writer = new PrintWriter(new FileOutputStream(filename));
            } catch (FileNotFoundException e) {
            }
        }

        public void write(Bin bin) {
            writer.println(bin.num);
        }

        @Override
        public void close() throws IOException {
            writer.flush();
            writer.close();
        }
    }

    static class MergeSource implements Closeable {
        private BufferedReader reader;
        private String cachedLine;

        public MergeSource(String filename) {
            try {
                FileReader fr = new FileReader(filename);
                this.reader = new BufferedReader(fr);
            } catch (FileNotFoundException e) {
            }
        }

        public boolean hasNext() {
            String line;
            try {
                line = this.reader.readLine();
                if (line == null || line.isEmpty()) {
                    return false;
                }
                this.cachedLine = line.trim();
                return true;
            } catch (IOException e) {
            }
            return false;
        }

        /**
         * 当前输入文件的最小元素，因为每一个文件都是有序的。
         *
         * 1
         * 2
         * 3 <-- nextPointer
         * 4
         * 5
         *
         * @return 3
         */
        public int next() {
            if (this.cachedLine == null) {
                if (!hasNext()) {
                    throw new IllegalStateException("no content");
                }
            }
            int num = Integer.parseInt(this.cachedLine);
            this.cachedLine = null;
            return num;
        }

        @Override
        public void close() throws IOException {
            this.reader.close();
        }
    }

    static class Bin implements Comparable<Bin> {
        int num;
        MergeSource source;

        Bin(MergeSource source, int num) {
            this.source = source;
            this.num = num;
        }

        @Override
        public int compareTo(Bin o) {
            return this.num - o.num;
        }
    }

    public List<Bin> prepare() {
        List<Bin> bins = new ArrayList<>();
        for (MergeSource source : sources) {
            Bin newBin = new Bin(source, source.next());
            bins.add(newBin);
        }

        // 对多个文件的最小元素进行排序
        Collections.sort(bins);
        return bins;
    }

    public void sort() {
        List<Bin> bins = prepare();
        while (true) {
            // 最小的值所在的文件
            MergeSource current = bins.get(0).source;
            // 最小的文件继续读下一行
            if (current.hasNext()) {
                Bin newBin = new Bin(current, current.next());
                int index = Collections.binarySearch(bins, newBin);

                // 仍然是最小的
                if (index == 0 || index == -1) {
                    this.out.write(newBin);
                    if (index == -1) {
                        throw new IllegalStateException("impossible");
                    }
                } else {
                    if (index < 0) {
                        index = -index - 1;
                    }

                    // 插入
                    bins.add(index, newBin);

                    // 第一个是最小的
                    Bin minBin = bins.remove(0);
                    this.out.write(minBin);
                }
            } else {
                // 当前的文件已经读空了
                Bin minBin = bins.remove(0);
                this.out.write(minBin);
                if (bins.isEmpty()) {
                    break;
                }
            }
        }
    }

    @Override
    public void close() throws IOException {
        for (MergeSource source : sources) {
            source.close();
        }
        this.out.close();
    }

    public static void main(String[] args) throws IOException {
        List<String> inputs = DiskMergeSort.generateFiles(100, 10000, 20000);
        // 运行多次看算法耗时
        for (int i = 0; i < 20; i++) {
            DiskMergeSort sorter = new DiskMergeSort(inputs, "output.txt");
            long start = System.currentTimeMillis();
            sorter.sort();
            long duration = System.currentTimeMillis() - start;
            System.out.printf("%dms\n", duration);
            sorter.close();
        }
    }
}
